package cn.itcast.flink.base.test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/7/28 12:03
 * Desc TODO
 */
public class WatermarkDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source
            //模拟实时订单数据(数据有延迟和乱序)
            DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
                private boolean flag = true;

                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    Random random = new Random();
                    while (flag) {
                        String orderId = UUID.randomUUID().toString();
                        int userId = random.nextInt(3);
                        int money = random.nextInt(100);
                        //模拟数据延迟和乱序!
                        long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                        ctx.collect(new Order(orderId, userId, money, eventTime));

                        TimeUnit.SECONDS.sleep(1);
                    }
                }

                @Override
                public void cancel() {
                    flag = false;
                }
            });

            //3.Transformation
            //-告诉Flink要基于事件时间来计算!
            //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
            //-分配水印机制，最多延迟3秒，告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
//代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
            //要求每隔5s,计算5秒内(基于时间的滚动窗口)，每个用户的订单总金额
            /*SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((SerializableTimestampAssigner<Order>) (element, recordTimestamp) -> Math.max(element.eventTime, recordTimestamp)))
                    .keyBy(t -> t.userId)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .sum("money");*/
            orderDS.assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() {
                @Override
                public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new BoundedOutOfOrdernessWatermarks<Order>(Duration.ofSeconds(3));
                }
            }).keyBy(t->t.userId)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.seconds(3));

            //4.Sink
           // result.print();

            //5.execute
            env.execute();
        }

        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
            private String orderId;
            private Integer userId;
            private Integer money;
            private Long eventTime;
        }
}
